package com.tdy.cdc.app.dwd;

import com.tdy.cdc.app.BaseSqlApp;
import com.tdy.cdc.common.Constant;
import com.tdy.cdc.util.FlinkSinkUtil;
import com.tdy.cdc.util.FlinkSourceUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

/**
 * @author NanHuang
 * @Date 2023/3/14
 */
public class App13_DwdMhsIptReliefShiftRecord extends BaseSqlApp {
    public static void main(String[] args) {
        new App13_DwdMhsIptReliefShiftRecord().init(
                5013,
                "App13_DwdMhsIptReliefShiftRecord",
                2
        );
    }

    @Override
    protected void invoke(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv) {
        // 1 创建源表
        createSourceTables(tableEnv);
        // 2 join
        joinTables(tableEnv);
        // 3 写入Kafka
        writeToKafka(tableEnv);
    }

    private void writeToKafka(StreamTableEnvironment tableEnv) {
        // create kafka mappings
        tableEnv.executeSql("CREATE TABLE target ("+
                "    RECORD_ID                             STRING     "+
                "    ,HOSPITAL_NAME                         STRING     "+
                "    ,HOSPITAL_CODE                         STRING     "+
                "    ,AGE_UNIT                              STRING     "+
                "    ,AGE                                   STRING     "+
                "    ,SEX_NAME                              STRING     "+
                "    ,SEX_CODE                              STRING     "+
                "    ,ID_CARD_NUMBER                        STRING     "+
                "    ,PATIENT_VISIT_CATEGORY_CODE           STRING     "+
                "    ,PATIENT_VISIT_CATEGORY                STRING     "+
                "    ,INPATIENT_VISIT_FLOW_ID               STRING     "+
                "    ,INPATIENT_NO                          STRING     "+
                "    ,PATIENT_ID                            STRING     "+
                "    ,PATIENT_NAME                          STRING     "+
                "    ,DOMAIN_ID                             STRING     "+
                "    ,VISIT_DATE                            STRING     "+
                "    ,VISIT_TIMES                           STRING     "+
                "    ,BIRTH_DATE                            STRING     "+
                "    ,AUTHOR_CODE                           STRING     "+
                "    ,AUTHOR_NAME                           STRING     "+
                "    ,CUSTODIAN_NAME                        STRING     "+
                "    ,CUSTODIAN_CODE                        STRING     "+
                "    ,TURN_DATE                             STRING     "+
                "    ,TURN_NAME                             STRING     "+
                "    ,TURN_CODE                             STRING     "+
                "    ,SUCCESSION_DATE                       STRING     "+
                "    ,SUCCESSOR_CODE                        STRING     "+
                "    ,SUCCESSOR_SIGN                        STRING     "+
                "    ,ADMIT_DATETIME                        STRING     "+
                "    ,BED_NO                                STRING     "+
                "    ,BED_NAME                              STRING     "+
                "    ,ROOM_NO                               STRING     "+
                "    ,ROOM_NAME                             STRING     "+
                "    ,WARD_ID                               STRING     "+
                "    ,WARD_NAME                             STRING     "+
                "    ,DEPT_ID                               STRING     "+
                "    ,DEPT_NAME                             STRING     "+
                "    ,CHIEF_COMPLAINT                       STRING     "+
                "    ,ADMIT_CONDITION                       STRING     "+
                "    ,CURRENT_SITUATION                     STRING     "+
                "    ,CHINA_MED_OBSERVE_RESULT              STRING     "+
                "    ,SUCCESSION_TREATMENT_PLAN             STRING     "+
                "    ,THERAPEUTIC_PRINCIPLE                 STRING     "+
                "    ,NEEDING_ATTENTION                     STRING     "+
                "    ,TREAT_PROCESS_DESC                    STRING     "+
                "    ,ADMIT_DIAG_ICD_CODE                  STRING     "+
                "    ,ADMIT_DIAG_ICD_NAME                  STRING     "+
                "    ,ADMIT_CHINA_DISEASE_CODE             STRING     "+
                "    ,ADMIT_CHINA_DISEASE_NAME             STRING     "+
                "    ,ADMIT_CHINA_SYNDROME_NAME            STRING     "+
                "    ,ADMIT_CHINA_SYNDROME_CODE            STRING     "+
                "    ,PRESENT_DIAG_ICD_CODE                STRING     "+
                "    ,PRESENT_DIAG_ICD_NAME                STRING     "+
                "    ,PRESENT_CHINA_DISEASE_CODE           STRING     "+
                "    ,PRESENT_CHINA_DISEASE_NAME           STRING     "+
                "    ,PRESENT_CHINA_SYNDROME_CODE          STRING     "+
                "    ,PRESENT_CHINA_SYNDROME_NAME          STRING     " +
                "    ,VISIT_ID                             STRING     "+
                "    ,PRIMARY KEY (RECORD_ID) NOT ENFORCED            "+
                ") "+ FlinkSinkUtil.getUpsertKafkaWith(Constant.TOPIC_DWD_MHS_IPT_RELIEF_SHIFT_RECORD));
        // load data
        tableEnv.executeSql("insert into target select * from join_result");
    }

    private void joinTables(StreamTableEnvironment tableEnv) {
        // 1 设置数据的TTL（暂时10min）
        tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(30 * 60));
        // 2 join操作
        Table joinResult = tableEnv.sqlQuery("select a.unique_id    AS    RECORD_ID                                "+
                "    ,a.hospital_name                AS      HOSPITAL_NAME                                                            "+
                "    ,a.hospital_code                AS      HOSPITAL_CODE                                                            "+
                "    ,a.age_unit                     AS      AGE_UNIT                                                                 "+
                "    ,a.age                          AS      AGE                                                                      "+
                "    ,a.sex                          AS      SEX_NAME                                                                 "+
                "    ,a.sex_code                     AS      SEX_CODE                                                                 "+
                "    ,a.identify_no                  AS      ID_CARD_NUMBER                                                           "+
                "    ,a.patient_typecode             AS      PATIENT_VISIT_CATEGORY_CODE                                              "+
                "    ,a.patient_type                 AS      PATIENT_VISIT_CATEGORY                                                   "+
                "    ,a.visit_id                     AS      INPATIENT_VISIT_FLOW_ID                                                  "+
                "    ,a.inpatient                    AS      INPATIENT_NO                                                             "+
                "    ,a.patient_id                   AS      PATIENT_ID                                                               "+
                "    ,a.patient_name                 AS      PATIENT_NAME                                                             "+
                "    ,a.domain_id                    AS      DOMAIN_ID                                                                "+
                "    ,a.visit_date                   AS      VISIT_DATE                                                               "+
                "    ,a.visit_times                  AS      VISIT_TIMES                                                              "+
                "    ,a.birth                        AS      BIRTH_DATE                                                               "+
                "    ,a.author_code                  AS      AUTHOR_CODE                                                              "+
                "    ,a.author_name                  AS      AUTHOR_NAME                                                              "+
                "    ,a.custodian_code               AS      CUSTODIAN_NAME                                                           "+
                "    ,a.custodian_name               AS      CUSTODIAN_CODE                                                           "+
                "    ,a.turn_data                    AS      TURN_DATE                                                                "+
                "    ,a.turn_name                    AS      TURN_NAME                                                                "+
                "    ,a.turn_code                    AS      TURN_CODE                                                                "+
                "    ,a.succession_date              AS      SUCCESSION_DATE                                                          "+
                "    ,a.successor_code               AS      SUCCESSOR_CODE                                                           "+
                "    ,a.successor_sign               AS      SUCCESSOR_SIGN                                                           "+
                "    ,a.admission_date               AS      ADMIT_DATETIME                                                           "+
                "    ,a.bed_no                       AS      BED_NO                                                                   "+
                "    ,a.bed_name                     AS      BED_NAME                                                                 "+
                "    ,a.ward_id                      AS      ROOM_NO                                                                  "+
                "    ,a.ward_name                    AS      ROOM_NAME                                                                "+
                "    ,a.wards_id                     AS      WARD_ID                                                                  "+
                "    ,a.wards_name                   AS      WARD_NAME                                                                "+
                "    ,a.dept_code                    AS      DEPT_ID                                                                  "+
                "    ,a.dept_name                    AS      DEPT_NAME                                                                "+
                "    ,a.chief_complaint              AS      CHIEF_COMPLAINT                                                          "+
                "    ,a.admission_status             AS      ADMIT_CONDITION                                                          "+
                "    ,a.current_situation            AS      CURRENT_SITUATION                                                        "+
                "    ,a.tcm_four_diagnosis           AS      CHINA_MED_OBSERVE_RESULT                                                 "+
                "    ,a.succession_treatment_plan    AS      SUCCESSION_TREATMENT_PLAN                                                "+
                "    ,a.principle_and_method         AS      THERAPEUTIC_PRINCIPLE                                                    "+
                "    ,a.needing_attention            AS      NEEDING_ATTENTION                                                        "+
                "    ,a.treatment_process            AS      TREAT_PROCESS_DESC                                                       "+
                "    ,case when b.diag_type='入院诊断-西医诊断' then b.diag_code else '' end     AS     ADMIT_DIAG_ICD_CODE              "+
                "    ,case when b.diag_type='入院诊断-西医诊断' then b.diag_name else '' end     AS     ADMIT_DIAG_ICD_NAME              "+
                "    ,case when b.diag_type='入院诊断-中医病名' then b.diag_code else '' end     AS     ADMIT_CHINA_DISEASE_CODE         "+
                "    ,case when b.diag_type='入院诊断-中医病名' then b.diag_name else '' end     AS     ADMIT_CHINA_DISEASE_NAME         "+
                "    ,case when b.diag_type='入院诊断-中医证候' then b.diag_code else '' end     AS     ADMIT_CHINA_SYNDROME_NAME        "+
                "    ,case when b.diag_type='入院诊断-中医证候' then b.diag_name else '' end     AS     ADMIT_CHINA_SYNDROME_CODE        "+
                "    ,case when b.diag_type='目前诊断-西医诊断' then b.diag_code else '' end     AS     PRESENT_DIAG_ICD_CODE            "+
                "    ,case when b.diag_type='目前诊断-西医诊断' then b.diag_name else '' end     AS     PRESENT_DIAG_ICD_NAME            "+
                "    ,case when b.diag_type='目前诊断-中医病名' then b.diag_code else '' end     AS     PRESENT_CHINA_DISEASE_CODE       "+
                "    ,case when b.diag_type='目前诊断-中医病名' then b.diag_name else '' end     AS     PRESENT_CHINA_DISEASE_NAME       "+
                "    ,case when b.diag_type='目前诊断-中医证候' then b.diag_code else '' end     AS     PRESENT_CHINA_SYNDROME_CODE      "+
                "    ,case when b.diag_type='目前诊断-中医证候' then b.diag_name else '' end     AS     PRESENT_CHINA_SYNDROME_NAME      " +
                "    ,visit_id AS VISIT_ID "+
                " from hdsd00_14_09 a                                                                                                 "+
                " left join tdy_list_diag b                                                                                           "+
                " on a.unique_id = b.unique_id and a.xds_type = b.xds_type                                                          ");
        tableEnv.createTemporaryView("join_result",joinResult);

    }

    private void createSourceTables(StreamTableEnvironment tableEnv) {
        // create table : hdsd00_14_09
        tableEnv.executeSql("create table hdsd00_14_09 (       "+
                "pk                              string," +
                "upload_time                     string," +
                "status                          string," +
                "empi                            string," +
                "encounter_id                    string," +
                "visit_date                      string," +
                "patient_id                      string," +
                "patient_domain                  string," +
                "visit_domain                    string," +
                "visit_id                        string," +
                "visit_times                     string," +
                "unique_id                       string," +
                "xds_type                        string," +
                "xds_name                        string," +
                "effective_time                  string," +
                "xds_version                     string," +
                "domain_id                       string," +
                "patient_type                    string," +
                "patient_typecode                string," +
                "inpatient                       string," +
                "identify_no                     string," +
                "patient_name                    string," +
                "sex_code                        string," +
                "sex                             string," +
                "birth                           string," +
                "age                             string," +
                "age_unit                        string," +
                "write_time                      string," +
                "author_code                     string," +
                "author_name                     string," +
                "custodian_name                  string," +
                "custodian_code                  string," +
                "turn_data                       string," +
                "turn_name                       string," +
                "turn_code                       string," +
                "succession_date                 string," +
                "successor_code                  string," +
                "successor_sign                  string," +
                "admission_date                  string," +
                "bed_no                          string," +
                "bed_name                        string," +
                "ward_id                         string," +
                "ward_name                       string," +
                "wards_id                        string," +
                "wards_name                      string," +
                "dept_code                       string," +
                "dept_name                       string," +
                "hospital_code                   string," +
                "hospital_name                   string," +
                "chief_complaint                 string," +
                "admission_status                string," +
                "current_situation               string," +
                "tcm_four_diagnosis              string," +
                "succession_treatment_plan       string," +
                "principle_and_method            string," +
                "needing_attention               string," +
                "treatment_process               string" +
                ")" + FlinkSourceUtil.getKafkaWith("hdsd00_14_09","App13_DwdMhsIptReliefShiftRecord"));

        // create table : tdy_list_diag
        super.readTdyListDiag(tableEnv,"App13_DwdMhsIptReliefShiftRecord");
    }
}
